package com.ls.fw.data.support.conf;

import java.util.ArrayList;
import java.util.List;

import org.apache.curator.framework.api.ACLProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.server.auth.KerberosName;

/**
 * ACLProvider that sets all ACLs to be owned by a configurable user
 * (typically hbase) via sasl auth, and readable by world.
 */
public class SaslZkACLProvider implements ACLProvider {
  private final String saslUser;
  private final List<ACL> defaultACLs;
  private final List<ACL> indexerProcessACLs;
  private final String zkBaseNode;

  public SaslZkACLProvider(Configuration conf, String hostname) throws Exception {
    this.saslUser = getPrincipalName(conf, hostname);
    defaultACLs = new ArrayList<ACL>();
    defaultACLs.add(new ACL(ZooDefs.Perms.ALL, new Id("sasl", saslUser)));

    // the indexer process znodes have to be readable for service discovery
    indexerProcessACLs = new ArrayList<ACL>(defaultACLs);
    indexerProcessACLs.add(new ACL(ZooDefs.Perms.READ, ZooDefs.Ids.ANYONE_ID_UNSAFE));
    this.zkBaseNode = conf.get(ConfKeys.ZK_ROOT_NODE) + "/indexerprocess";
  }

  @Override
  public List<ACL> getAclForPath(String path) {
    if (isIndexerProcessZNode(path)) {
      return indexerProcessACLs;
    } else {
      return getDefaultAcl();
    }
  }

  @Override
  public List<ACL> getDefaultAcl() {
    return defaultACLs;
  }

  private String getPrincipalName(Configuration conf, String hostname) throws Exception {
    // essentially running as an HBase RegionServer
    String principalProp = conf.get("hbase.regionserver.kerberos.principal");
    if (principalProp != null) {
      String princ = SecurityUtil.getServerPrincipal(principalProp, hostname);
      KerberosName kerbName = new KerberosName(princ);
      return kerbName.getShortName();
    }
    return "hbase";
  }

  private boolean isIndexerProcessZNode(String path) {
    return (path != null && (path.equals(zkBaseNode) || path.startsWith(zkBaseNode + "/")));
  }
}
